package gu.sql2java.observer;

import gu.simplemq.Channel;
import gu.simplemq.ISubscriber;
import gu.simplemq.exceptions.SmqUnsubscribeException;
import gu.simplemq.json.BaseJsonEncoder;
import gu.sql2java.BaseBean;
import gu.sql2java.ListenerContainer;
import gu.sql2java.ListenerContainer.FireType;
import gu.sql2java.RowMetaData;
import gu.sql2java.TableListener;
import gu.sql2java.UnnameRow;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.util.TypeUtils;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkArgument;
import static gu.sql2java.observer.JDBCUtility.parseTablenme;



/**
 * 基于消息队列(Message Queue)的表记录观察者(侦听器)实现
 * @author guyadong
 *
 */
public class RowObserver {
	/** 执行侦听器激活的单线程池对象 */
	private static final ExecutorService executor = MoreExecutors.getExitingExecutorService(
			new ThreadPoolExecutor(1, 1,
	                0L, TimeUnit.MILLISECONDS,
	                new LinkedBlockingQueue<Runnable>(),
	                new ThreadFactoryBuilder().setNameFormat("sql2java-row-observer-%d").build()));
	private ISubscriber subscriber;
	private final ConcurrentHashMap<String,ListenerContainer<BaseBean>> containers 	= new ConcurrentHashMap<>(16);
	public static final String TOPIC_PREFIX = "ROWOVS/";
	private static final String JN_TRIGGER_TYPE = "triggerType";
	/** row 字段保存数据的方式:JSON */
	private static final String JN_STORE_TYPE = "rowType";
	private static final String JN_ROW = "row";
	private static final String JN_OLD = "OLD";
	private static final String JN_NEW = "NEW";
	public RowObserver(ISubscriber subscriber) {
		super();
		this.subscriber = checkNotNull(subscriber,"subscriber is null");
	}

	/**
	 * 注册{@code qualifiedName}指定的表侦听器容器
	 * @param qualifiedName 表名([$dbhostname/]$schema.$tablename),
	 * {@code dbhostname}为数据库主机名，当有多个数据库主机连接同一个消息系统时用于在消息频道名中区分不同的数据库主机,可为空 
	 * @param container
	 */
	public void register(String qualifiedName, ListenerContainer<BaseBean> container){
		containers.put(qualifiedName, checkNotNull(container, "container is null"));
		subscriber.register(new TableObserverChannel(qualifiedName, container));
	}
	/**
	 * 注册{@code qualifiedName}指定的表侦听器容器,适用于未用sql2java生成代码的表
	 * @param qualifiedName 表名([$dbhostname/]$schema.$tablename)
	 * {@code dbhostname}为数据库主机名，当有多个数据库主机连接同一个消息系统时用于在消息频道名中区分不同的数据库主机,可为空 
	 * @param rowMetaData 表记录元数据对象
	 * @return 返回注册的频道对象
	 * 	注册自己的侦听器来接收表记录通知
	 */
	@SuppressWarnings({ "unchecked", "rawtypes" })
	public TableObserverChannel register(String qualifiedName, SimpleRowMetaData rowMetaData){
		checkArgument(null == RowMetaData.getMetaDataUnchecked(parseTablenme(checkNotNull(qualifiedName,"schematable is null"))),
				"schematable:%s be defined in RowMetaData,cannot use the method",qualifiedName);
		containers.putIfAbsent(qualifiedName, new ListenerContainer(false,FireType.ROW_OBSERVER));
		ListenerContainer container = containers.get(qualifiedName);
		TableObserverChannel channel = new TableObserverChannel(qualifiedName, container,rowMetaData);
		subscriber.register(channel);
		return channel;
}
	/**
	 * 表侦听器频道实现
	 * @author guyadong
	 *
	 */
	public static class TableObserverChannel extends Channel<String>{
		private final ListenerContainer<BaseBean> container;
		private final BaseJsonEncoder jsonEncoder;
		private final RowMetaData rowMetaData;
		protected TableObserverChannel(String schematable, ListenerContainer<BaseBean> container) {
			this(schematable, container, RowMetaData.getMetaData(parseTablenme(schematable)));
		}
		protected TableObserverChannel(String schematable, ListenerContainer<BaseBean> container, RowMetaData rowMetaData) {
			super(TOPIC_PREFIX + checkNotNull(schematable,"schematable is null"));
			this.container = checkNotNull(container,"container is null");
			this.jsonEncoder = BaseJsonEncoder.getEncoder();
			this.rowMetaData = checkNotNull(rowMetaData,"rowMetaData is null");
		}
		private BaseBean parseBean(JSONObject row,String name)
		{
			String json = row.getObject(name, String.class);
			BaseBean bean = null;
			if(null != json){
				JSONObject jsonObject = jsonEncoder.fromJson(json, JSONObject.class);
				if(UnnameRow.class.equals(rowMetaData.beanType)){
					bean = new UnnameRow(rowMetaData);
				}else {
					try {
						bean = rowMetaData.beanType.newInstance();
					} catch (Exception e) {
						Throwables.throwIfUnchecked(e);
						throw new RuntimeException(e);
					} 
				}
				for(int i = 0; i<rowMetaData.columnCount; ++i){
					Object origin = jsonObject.get(rowMetaData.columnNames.get(i));
					if(null != origin){
						Object value=TypeUtils.cast(origin,	rowMetaData.columnTypes.get(i), null);
						bean.setValue(i, value);
					}
				}
				bean.resetIsModified();
			}
			return bean;
		}
		private void doOnSubscribe(String t) {
			try {				
				JSONObject jsonObject = JSON.parseObject(t, JSONObject.class);
				TriggerType triggerType = jsonObject.getObject(JN_TRIGGER_TYPE, TriggerType.class);
				checkNotNull(triggerType,"triggerType field is null");
				StoreType storeType = jsonObject.getObject(JN_STORE_TYPE, StoreType.class);
				checkArgument(StoreType.JSON.equals(storeType), "UNSUPPORTED storeType %s", storeType);
				JSONObject row = jsonObject.getObject(JN_ROW, JSONObject.class);
				BaseBean oldBean = parseBean(row, JN_OLD);
				BaseBean newBean = parseBean(row, JN_NEW);
				triggerType.checkRow(oldBean, newBean);
				if(triggerType.name().endsWith("_UPDATE") ){
					int modified = 0;
					for(int i =0; i < rowMetaData.columnCount; ++i)
					{
						if(!Objects.equal(oldBean.getValue(i), newBean.getValue(i))){
							modified |= (1<<i); 
						}
					}
					newBean.setModified(modified);
				}
				// set firer as ROW_OBSERVER
				container.firer(FireType.ROW_OBSERVER);
				switch (triggerType) {
				case AFTER_DELETE:
				{
					container.afterDelete(oldBean);
					break;
				}
				case BEFORE_DELETE:
				{
					container.beforeDelete(oldBean);
					break;
				}
				case AFTER_INSERT:
				{
					container.afterInsert(newBean);
					break;
				}
				case BEFORE_INSERT:
				{
					container.beforeInsert(newBean);
					break;
				}
				case AFTER_UPDATE:
				{
					container.afterUpdate(oldBean);
					break;
				}
				case BEFORE_UPDATE:
				{
					container.beforeUpdate(newBean);
					break;
				}
				default:
					throw new IllegalArgumentException("UNSUPPORTED TriggerType " + triggerType.toString());
				}
			} catch (Exception e) {
				logger.info(e.getMessage(),e);				
			} finally {
				container.clearFirer();
			}
		}
		@Override
		public void onSubscribe(final String t) throws SmqUnsubscribeException {
            // 异步执行侦听器激活
            executor.execute(new Runnable() {
				
				@Override
				public void run() {
					doOnSubscribe(t);	
				}
			});
		}
		/**
		 * 注册侦听器
		 * @param listener
		 * @return 注册成功返回{@code true},listener为{@code null}或已经注册返回false,
		 */
		public boolean registerListener(TableListener<BaseBean> listener) {
			return container.add(listener);
		}
		/**
		 * 删除侦听器
		 * @param listener
		 */
		public boolean remove(TableListener<BaseBean> listener) {
			return container.remove(listener);
		}
		
	}
}
